From 5fec4cf7660cbcd50a016750b5de37394c6a65b1 Mon Sep 17 00:00:00 2001 From: "kaf24@firebug.cl.cam.ac.uk" Date: Sun, 9 Oct 2005 23:53:03 +0100 Subject: [PATCH] Simplify reply logic in xenstored. Maintain a linked list of pending replies that are sent out in order. Currently we only read new requests when the reply list is empty. In fact there is no good reason for this restriction. Another interesting point is that (on my test machine) hotplug blk setup fails if xenstored_client connects to xenstored via the unix domain socket rather than through the kernel --- this points to some user/kernel races that are 'fixed' by the extra serialisation of the in-kernel mutexes. It definitely needs looking into. Signed-off-by: Keir Fraser --- tools/xenstore/xenstored_core.c | 117 ++++++++++++++---------------- tools/xenstore/xenstored_core.h | 33 +++++---- tools/xenstore/xenstored_domain.c | 6 +- tools/xenstore/xenstored_watch.c | 87 ++++++---------------- tools/xenstore/xenstored_watch.h | 10 +-- xen/include/public/io/xs_wire.h | 78 ++++++++++---------- 6 files changed, 136 insertions(+), 195 deletions(-) diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c index 25c9b03bb8..4c998bea70 100644 --- a/tools/xenstore/xenstored_core.c +++ b/tools/xenstore/xenstored_core.c @@ -235,52 +235,50 @@ void trace(const char *fmt, ...) talloc_free(str); } -static bool write_message(struct connection *conn) +static bool write_messages(struct connection *conn) { int ret; - struct buffered_data *out = conn->out; - - if (out->inhdr) { - if (verbose) - xprintf("Writing msg %s (%s) out to %p\n", - sockmsg_string(out->hdr.msg.type), - out->buffer, conn); - ret = conn->write(conn, out->hdr.raw + out->used, - sizeof(out->hdr) - out->used); + struct buffered_data *out, *tmp; + + list_for_each_entry_safe(out, tmp, &conn->out_list, list) { + if (out->inhdr) { + if (verbose) + xprintf("Writing msg %s (%s) out to %p\n", + sockmsg_string(out->hdr.msg.type), + out->buffer, conn); + ret = conn->write(conn, out->hdr.raw + out->used, + sizeof(out->hdr) - out->used); + if (ret < 0) + return false; + + out->used += ret; + if (out->used < sizeof(out->hdr)) + return true; + + out->inhdr = false; + out->used = 0; + + /* Second write might block if non-zero. */ + if (out->hdr.msg.len && !conn->domain) + return true; + } + + ret = conn->write(conn, out->buffer + out->used, + out->hdr.msg.len - out->used); + if (ret < 0) return false; out->used += ret; - if (out->used < sizeof(out->hdr)) + if (out->used != out->hdr.msg.len) return true; - out->inhdr = false; - out->used = 0; + trace_io(conn, "OUT", out); - /* Second write might block if non-zero. */ - if (out->hdr.msg.len && !conn->domain) - return true; + list_del(&out->list); + talloc_free(out); } - ret = conn->write(conn, out->buffer + out->used, - out->hdr.msg.len - out->used); - - if (ret < 0) - return false; - - out->used += ret; - if (out->used != out->hdr.msg.len) - return true; - - trace_io(conn, "OUT", out); - conn->out = NULL; - talloc_free(out); - - queue_next_event(conn); - - /* No longer busy? */ - if (!conn->out) - conn->state = OK; return true; } @@ -297,9 +295,9 @@ static int destroy_conn(void *_conn) FD_SET(conn->fd, &set); none.tv_sec = none.tv_usec = 0; - while (conn->out + while (!list_empty(&conn->out_list) && select(conn->fd+1, NULL, &set, NULL, &none) == 1) - if (!write_message(conn)) + if (!write_messages(conn)) break; close(conn->fd); } @@ -326,9 +324,9 @@ static int initialize_set(fd_set *inset, fd_set *outset, int sock, int ro_sock) list_for_each_entry(i, &connections, list) { if (i->domain) continue; - if (i->state == OK) + if (list_empty(&i->out_list)) FD_SET(i->fd, inset); - if (i->out) + if (!list_empty(&i->out_list)) FD_SET(i->fd, outset); if (i->fd > max) max = i->fd; @@ -594,14 +592,7 @@ void send_reply(struct connection *conn, enum xsd_sockmsg_type type, bdata->hdr.msg.len = len; memcpy(bdata->buffer, data, len); - /* There might be an event going out now. Queue behind it. */ - if (conn->out) { - assert(conn->out->hdr.msg.type == XS_WATCH_EVENT); - assert(!conn->waiting_reply); - conn->waiting_reply = bdata; - } else - conn->out = bdata; - conn->state = BUSY; + list_add_tail(&bdata->list, &conn->out_list); } /* Some routines (write, mkdir, etc) just need a non-error return */ @@ -1148,8 +1139,6 @@ static void consider_message(struct connection *conn) enum xsd_sockmsg_type volatile type = conn->in->hdr.msg.type; jmp_buf talloc_fail; - assert(conn->state == OK); - /* For simplicity, we kill the connection on OOM. */ talloc_set_fail_handler(out_of_mem, &talloc_fail); if (setjmp(talloc_fail)) { @@ -1186,10 +1175,7 @@ end: static void handle_input(struct connection *conn) { int bytes; - struct buffered_data *in; - - assert(conn->state == OK); - in = conn->in; + struct buffered_data *in = conn->in; /* Not finished header yet? */ if (in->inhdr) { @@ -1237,7 +1223,7 @@ bad_client: static void handle_output(struct connection *conn) { - if (!write_message(conn)) + if (!write_messages(conn)) talloc_free(conn); } @@ -1254,8 +1240,6 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read) if (!new) return NULL; - new->state = OK; - new->out = new->waiting_reply = NULL; new->fd = -1; new->id = 0; new->domain = NULL; @@ -1263,6 +1247,7 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read) new->write = write; new->read = read; new->can_write = true; + INIT_LIST_HEAD(&new->out_list); INIT_LIST_HEAD(&new->watches); talloc_set_fail_handler(out_of_mem, &talloc_fail); @@ -1317,23 +1302,17 @@ void dump_connection(void) list_for_each_entry(i, &connections, list) { printf("Connection %p:\n", i); printf(" state = %s\n", - i->state == OK ? "OK" - : i->state == BUSY ? "BUSY" - : "INVALID"); + list_empty(&i->out_list) ? "OK" : "BUSY"); if (i->id) printf(" id = %i\n", i->id); if (!i->in->inhdr || i->in->used) printf(" got %i bytes of %s\n", i->in->used, i->in->inhdr ? "header" : "data"); +#if 0 if (i->out) printf(" sending message %s (%s) out\n", sockmsg_string(i->out->hdr.msg.type), i->out->buffer); - if (i->waiting_reply) - printf(" ... and behind is queued %s (%s)\n", - sockmsg_string(i->waiting_reply->hdr.msg.type), - i->waiting_reply->buffer); -#if 0 if (i->transaction) dump_transaction(i); if (i->domain) @@ -1604,3 +1583,13 @@ int main(int argc, char *argv[]) max = initialize_set(&inset, &outset, *sock, *ro_sock); } } + +/* + * Local variables: + * c-file-style: "linux" + * indent-tabs-mode: t + * c-indent-level: 8 + * c-basic-offset: 8 + * tab-width: 8 + * End: + */ diff --git a/tools/xenstore/xenstored_core.h b/tools/xenstore/xenstored_core.h index e017bddf42..c50936f1bc 100644 --- a/tools/xenstore/xenstored_core.h +++ b/tools/xenstore/xenstored_core.h @@ -31,14 +31,19 @@ struct buffered_data { + struct list_head list; + /* Are we still doing the header? */ bool inhdr; + /* How far are we? */ unsigned int used; + union { struct xsd_sockmsg msg; char raw[sizeof(struct xsd_sockmsg)]; } hdr; + /* The actual data. */ char *buffer; }; @@ -47,14 +52,6 @@ struct connection; typedef int connwritefn_t(struct connection *, const void *, unsigned int); typedef int connreadfn_t(struct connection *, void *, unsigned int); -enum state -{ - /* Doing action, not listening */ - BUSY, - /* Completed */ - OK, -}; - struct connection { struct list_head list; @@ -62,12 +59,9 @@ struct connection /* The file descriptor we came in on. */ int fd; - /* Who am I? 0 for socket connections. */ + /* Who am I? 0 for socket connections. */ domid_t id; - /* Blocked on transaction? Busy? */ - enum state state; - /* Is this a read-only connection? */ bool can_write; @@ -75,10 +69,7 @@ struct connection struct buffered_data *in; /* Buffered output data */ - struct buffered_data *out; - - /* If we had a watch fire outgoing when we needed to reply... */ - struct buffered_data *waiting_reply; + struct list_head out_list; /* My transaction, if any. */ struct transaction *transaction; @@ -172,3 +163,13 @@ void trace(const char *fmt, ...); extern int event_fd; #endif /* _XENSTORED_CORE_H */ + +/* + * Local variables: + * c-file-style: "linux" + * indent-tabs-mode: t + * c-indent-level: 8 + * c-basic-offset: 8 + * tab-width: 8 + * End: + */ diff --git a/tools/xenstore/xenstored_domain.c b/tools/xenstore/xenstored_domain.c index a7cd4493ad..15c98bfb49 100644 --- a/tools/xenstore/xenstored_domain.c +++ b/tools/xenstore/xenstored_domain.c @@ -276,12 +276,14 @@ void handle_event(void) bool domain_can_read(struct connection *conn) { - return conn->state == OK && buffer_has_input(conn->domain->input); + return (list_empty(&conn->out_list) && + buffer_has_input(conn->domain->input)); } bool domain_can_write(struct connection *conn) { - return conn->out && buffer_has_output_room(conn->domain->output); + return (!list_empty(&conn->out_list) && + buffer_has_output_room(conn->domain->output)); } static struct domain *new_domain(void *context, domid_t domid, diff --git a/tools/xenstore/xenstored_watch.c b/tools/xenstore/xenstored_watch.c index dfb0cd1a3e..6f3c2e4e03 100644 --- a/tools/xenstore/xenstored_watch.c +++ b/tools/xenstore/xenstored_watch.c @@ -32,17 +32,6 @@ #include "xenstored_test.h" #include "xenstored_domain.h" -/* FIXME: time out unacked watches. */ -struct watch_event -{ - /* The events on this watch. */ - struct list_head list; - - /* Data to send (node\0token\0). */ - unsigned int len; - char *data; -}; - struct watch { /* Watches on this connection */ @@ -58,50 +47,17 @@ struct watch char *node; }; -/* Look through our watches: if any of them have an event, queue it. */ -void queue_next_event(struct connection *conn) -{ - struct watch_event *event; - struct watch *watch; - - /* We had a reply queued already? Send it: other end will - * discard watch. */ - if (conn->waiting_reply) { - conn->out = conn->waiting_reply; - conn->waiting_reply = NULL; - return; - } - - list_for_each_entry(watch, &conn->watches, list) { - event = list_top(&watch->events, struct watch_event, list); - if (event) { - list_del(&event->list); - talloc_free(event); - send_reply(conn,XS_WATCH_EVENT,event->data,event->len); - break; - } - } -} - -static int destroy_watch_event(void *_event) -{ - struct watch_event *event = _event; - - trace_destroy(event, "watch_event"); - return 0; -} - static void add_event(struct connection *conn, struct watch *watch, const char *name) { - struct watch_event *event; + /* Data to send (node\0token\0). */ + unsigned int len; + char *data; if (!check_event_node(name)) { /* Can this conn load node, or see that it doesn't exist? */ - struct node *node; - - node = get_node(conn, name, XS_PERM_READ); + struct node *node = get_node(conn, name, XS_PERM_READ); if (!node && errno != ENOENT) return; } @@ -112,14 +68,12 @@ static void add_event(struct connection *conn, name++; } - event = talloc(watch, struct watch_event); - event->len = strlen(name) + 1 + strlen(watch->token) + 1; - event->data = talloc_array(event, char, event->len); - strcpy(event->data, name); - strcpy(event->data + strlen(name) + 1, watch->token); - talloc_set_destructor(event, destroy_watch_event); - list_add_tail(&event->list, &watch->events); - trace_create(event, "watch_event"); + len = strlen(name) + 1 + strlen(watch->token) + 1; + data = talloc_array(watch, char, len); + strcpy(data, name); + strcpy(data + strlen(name) + 1, watch->token); + send_reply(conn, XS_WATCH_EVENT, data, len); + talloc_free(data); } /* FIXME: we fail to fire on out of memory. Should drop connections. */ @@ -139,11 +93,6 @@ void fire_watches(struct connection *conn, const char *name, bool recurse) add_event(i, watch, name); else if (recurse && is_child(watch->node, name)) add_event(i, watch, watch->node); - else - continue; - /* If connection not doing anything, queue this. */ - if (i->state == OK) - queue_next_event(i); } } } @@ -231,13 +180,19 @@ void do_unwatch(struct connection *conn, struct buffered_data *in) void dump_watches(struct connection *conn) { struct watch *watch; - struct watch_event *event; - list_for_each_entry(watch, &conn->watches, list) { + list_for_each_entry(watch, &conn->watches, list) printf(" watch on %s token %s\n", watch->node, watch->token); - list_for_each_entry(event, &watch->events, list) - printf(" event: %s\n", event->data); - } } #endif + +/* + * Local variables: + * c-file-style: "linux" + * indent-tabs-mode: t + * c-indent-level: 8 + * c-basic-offset: 8 + * tab-width: 8 + * End: + */ diff --git a/tools/xenstore/xenstored_watch.h b/tools/xenstore/xenstored_watch.h index ed8892c4aa..2eccff9476 100644 --- a/tools/xenstore/xenstored_watch.h +++ b/tools/xenstore/xenstored_watch.h @@ -23,17 +23,9 @@ #include "xenstored_core.h" void do_watch(struct connection *conn, struct buffered_data *in); -void do_watch_ack(struct connection *conn, const char *token); void do_unwatch(struct connection *conn, struct buffered_data *in); -/* Is this a watch event message for this connection? */ -bool is_watch_event(struct connection *conn, struct buffered_data *out); - -/* Look through our watches: if any of them have an event, queue it. */ -void queue_next_event(struct connection *conn); - -/* Fire all watches: recurse means all the children are affected (ie. rm). - */ +/* Fire all watches: recurse means all the children are affected (ie. rm). */ void fire_watches(struct connection *conn, const char *name, bool recurse); void dump_watches(struct connection *conn); diff --git a/xen/include/public/io/xs_wire.h b/xen/include/public/io/xs_wire.h index 3c26cc0d80..4ab5633b8a 100644 --- a/xen/include/public/io/xs_wire.h +++ b/xen/include/public/io/xs_wire.h @@ -30,23 +30,23 @@ enum xsd_sockmsg_type { - XS_DEBUG, - XS_DIRECTORY, - XS_READ, - XS_GET_PERMS, - XS_WATCH, - XS_UNWATCH, - XS_TRANSACTION_START, - XS_TRANSACTION_END, - XS_INTRODUCE, - XS_RELEASE, - XS_GET_DOMAIN_PATH, - XS_WRITE, - XS_MKDIR, - XS_RM, - XS_SET_PERMS, - XS_WATCH_EVENT, - XS_ERROR, + XS_DEBUG, + XS_DIRECTORY, + XS_READ, + XS_GET_PERMS, + XS_WATCH, + XS_UNWATCH, + XS_TRANSACTION_START, + XS_TRANSACTION_END, + XS_INTRODUCE, + XS_RELEASE, + XS_GET_DOMAIN_PATH, + XS_WRITE, + XS_MKDIR, + XS_RM, + XS_SET_PERMS, + XS_WATCH_EVENT, + XS_ERROR, }; #define XS_WRITE_NONE "NONE" @@ -56,38 +56,40 @@ enum xsd_sockmsg_type /* We hand errors as strings, for portability. */ struct xsd_errors { - int errnum; - const char *errstring; + int errnum; + const char *errstring; }; #define XSD_ERROR(x) { x, #x } static struct xsd_errors xsd_errors[] __attribute__((unused)) = { - XSD_ERROR(EINVAL), - XSD_ERROR(EACCES), - XSD_ERROR(EEXIST), - XSD_ERROR(EISDIR), - XSD_ERROR(ENOENT), - XSD_ERROR(ENOMEM), - XSD_ERROR(ENOSPC), - XSD_ERROR(EIO), - XSD_ERROR(ENOTEMPTY), - XSD_ERROR(ENOSYS), - XSD_ERROR(EROFS), - XSD_ERROR(EBUSY), - XSD_ERROR(EAGAIN), - XSD_ERROR(EISCONN), + XSD_ERROR(EINVAL), + XSD_ERROR(EACCES), + XSD_ERROR(EEXIST), + XSD_ERROR(EISDIR), + XSD_ERROR(ENOENT), + XSD_ERROR(ENOMEM), + XSD_ERROR(ENOSPC), + XSD_ERROR(EIO), + XSD_ERROR(ENOTEMPTY), + XSD_ERROR(ENOSYS), + XSD_ERROR(EROFS), + XSD_ERROR(EBUSY), + XSD_ERROR(EAGAIN), + XSD_ERROR(EISCONN), }; struct xsd_sockmsg { - u32 type; - u32 len; /* Length of data following this. */ + u32 type; /* XS_??? */ + u32 req_id;/* Request identifier, echoed in daemon's response. */ + u32 tx_id; /* Transaction id (0 if not related to a transaction). */ + u32 len; /* Length of data following this. */ - /* Generally followed by nul-terminated string(s). */ + /* Generally followed by nul-terminated string(s). */ }; enum xs_watch_type { - XS_WATCH_PATH = 0, - XS_WATCH_TOKEN, + XS_WATCH_PATH = 0, + XS_WATCH_TOKEN, }; #endif /* _XS_WIRE_H */ -- 2.30.2